/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package actors

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"hash/fnv"
	"net"
	nethttp "net/http"
	"reflect"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	clocklib "github.com/benbjohnson/clock"
	"github.com/cenkalti/backoff/v4"
	"github.com/google/uuid"
	"github.com/mitchellh/mapstructure"
	"github.com/valyala/fasthttp"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"github.com/dapr/components-contrib/state"
	"github.com/dapr/dapr/pkg/actors/internal"
	"github.com/dapr/dapr/pkg/channel"
	"github.com/dapr/dapr/pkg/concurrency"
	configuration "github.com/dapr/dapr/pkg/config"
	daprCredentials "github.com/dapr/dapr/pkg/credentials"
	diag "github.com/dapr/dapr/pkg/diagnostics"
	diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils"
	"github.com/dapr/dapr/pkg/health"
	invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
	"github.com/dapr/dapr/pkg/modes"
	commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
	internalv1pb "github.com/dapr/dapr/pkg/proto/internals/v1"
	"github.com/dapr/dapr/pkg/resiliency"
	"github.com/dapr/dapr/pkg/retry"
	"github.com/dapr/kit/logger"
	timeutils "github.com/dapr/kit/time"
)

const (
	daprSeparator        = "||"
	metadataPartitionKey = "partitionKey"
	metadataZeroID       = "00000000-0000-0000-0000-000000000000"
)

var log = logger.NewLogger("dapr.runtime.actor")

// Actors allow calling into virtual actors as well as actor state management.
//
//nolint:interfacebloat
type Actors interface {
	Call(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
	Init() error
	Stop()
	GetState(ctx context.Context, req *GetStateRequest) (*StateResponse, error)
	TransactionalStateOperation(ctx context.Context, req *TransactionalRequest) error
	GetReminder(ctx context.Context, req *GetReminderRequest) (*Reminder, error)
	CreateReminder(ctx context.Context, req *CreateReminderRequest) error
	DeleteReminder(ctx context.Context, req *DeleteReminderRequest) error
	RenameReminder(ctx context.Context, req *RenameReminderRequest) error
	CreateTimer(ctx context.Context, req *CreateTimerRequest) error
	DeleteTimer(ctx context.Context, req *DeleteTimerRequest) error
	IsActorHosted(ctx context.Context, req *ActorHostedRequest) bool
	GetActiveActorsCount(ctx context.Context) []ActiveActorsCount
	RegisterInternalActor(ctx context.Context, actorType string, actor InternalActor) error
}

// PlacementService allows for interacting with the actor placement service.
type PlacementService interface {
	Start()
	Stop()
	WaitUntilPlacementTableIsReady(ctx context.Context) error
	LookupActor(actorType, actorID string) (host string, appID string)
	AddHostedActorType(actorType string) error
}

// GRPCConnectionFn is the type of the function that returns a gRPC connection
type GRPCConnectionFn func(ctx context.Context, address string, id string, namespace string, customOpts ...grpc.DialOption) (*grpc.ClientConn, func(destroy bool), error)

type actorsRuntime struct {
	appChannel             channel.AppChannel
	store                  state.Store
	transactionalStore     state.TransactionalStore
	placement              PlacementService
	grpcConnectionFn       GRPCConnectionFn
	config                 Config
	actorsTable            *sync.Map
	activeTimers           *sync.Map
	activeTimersLock       *sync.RWMutex
	activeReminders        *sync.Map
	remindersLock          *sync.RWMutex
	remindersMigrationLock *sync.Mutex
	activeRemindersLock    *sync.RWMutex
	reminders              map[string][]actorReminderReference
	evaluationLock         *sync.RWMutex
	evaluationChan         chan struct{}
	appHealthy             *atomic.Bool
	certChain              *daprCredentials.CertChain
	tracingSpec            configuration.TracingSpec
	resiliency             resiliency.Provider
	storeName              string
	ctx                    context.Context
	cancel                 context.CancelFunc
	clock                  clocklib.Clock
	internalActors         map[string]InternalActor
	internalActorChannel   *internalActorChannel
}

// ActiveActorsCount contain actorType and count of actors each type has.
type ActiveActorsCount struct {
	Type  string `json:"type"`
	Count int    `json:"count"`
}

// ActorMetadata represents information about the actor type.
type ActorMetadata struct {
	ID                string                 `json:"id"`
	RemindersMetadata ActorRemindersMetadata `json:"actorRemindersMetadata"`
	Etag              *string                `json:"-"`
}

// ActorRemindersMetadata represents information about actor's reminders.
type ActorRemindersMetadata struct {
	PartitionCount int                `json:"partitionCount"`
	partitionsEtag map[uint32]*string `json:"-"`
}

type actorReminderReference struct {
	actorMetadataID           string
	actorRemindersPartitionID uint32
	reminder                  Reminder
}

var ErrIncompatibleStateStore = errors.New("actor state store does not exist, or does not support transactions which are required to save state - please see https://docs.dapr.io/operations/components/setup-state-store/supported-state-stores/")

var ErrDaprResponseHeader = errors.New("error indicated via actor header response")

var ErrReminderCanceled = errors.New("reminder has been canceled")

// ActorsOpts contains options for NewActors.
type ActorsOpts struct {
	StateStore       state.Store
	AppChannel       channel.AppChannel
	GRPCConnectionFn GRPCConnectionFn
	Config           Config
	CertChain        *daprCredentials.CertChain
	TracingSpec      configuration.TracingSpec
	Resiliency       resiliency.Provider
	StateStoreName   string

	// MockPlacement is a placement service implementation used for testing
	MockPlacement PlacementService
}

// NewActors create a new actors runtime with given config.
func NewActors(opts ActorsOpts) Actors {
	return newActorsWithClock(opts, clocklib.New())
}

func newActorsWithClock(opts ActorsOpts, clock clocklib.Clock) Actors {
	var transactionalStore state.TransactionalStore
	if opts.StateStore != nil {
		features := opts.StateStore.Features()
		if state.FeatureETag.IsPresent(features) && state.FeatureTransactional.IsPresent(features) {
			transactionalStore = opts.StateStore.(state.TransactionalStore)
		}
	}

	appHealthy := &atomic.Bool{}
	appHealthy.Store(true)
	ctx, cancel := context.WithCancel(context.Background())
	return &actorsRuntime{
		store:                  opts.StateStore,
		appChannel:             opts.AppChannel,
		grpcConnectionFn:       opts.GRPCConnectionFn,
		config:                 opts.Config,
		certChain:              opts.CertChain,
		tracingSpec:            opts.TracingSpec,
		resiliency:             opts.Resiliency,
		storeName:              opts.StateStoreName,
		placement:              opts.MockPlacement,
		transactionalStore:     transactionalStore,
		actorsTable:            &sync.Map{},
		activeTimers:           &sync.Map{},
		activeTimersLock:       &sync.RWMutex{},
		activeReminders:        &sync.Map{},
		remindersLock:          &sync.RWMutex{},
		remindersMigrationLock: &sync.Mutex{},
		activeRemindersLock:    &sync.RWMutex{},
		reminders:              map[string][]actorReminderReference{},
		evaluationLock:         &sync.RWMutex{},
		evaluationChan:         make(chan struct{}, 1),
		appHealthy:             appHealthy,
		ctx:                    ctx,
		cancel:                 cancel,
		clock:                  clock,
		internalActors:         map[string]InternalActor{},
		internalActorChannel:   newInternalActorChannel(),
	}
}

func (a *actorsRuntime) haveCompatibleStorage() bool {
	if a.store == nil {
		// If we have hosted actors and no store, we can't initialize the actor runtime
		return false
	}

	features := a.store.Features()
	return state.FeatureETag.IsPresent(features) && state.FeatureTransactional.IsPresent(features)
}

func (a *actorsRuntime) Init() error {
	if len(a.config.PlacementAddresses) == 0 {
		return errors.New("actors: couldn't connect to placement service: address is empty")
	}

	if len(a.config.HostedActorTypes) > 0 {
		if !a.haveCompatibleStorage() {
			return ErrIncompatibleStateStore
		}
	}

	hostname := net.JoinHostPort(a.config.HostAddress, strconv.Itoa(a.config.Port))

	afterTableUpdateFn := func() {
		a.drainRebalancedActors()
		a.evaluateReminders()
	}
	appHealthFn := func() bool { return a.appHealthy.Load() }

	if a.placement == nil {
		a.placement = internal.NewActorPlacement(
			a.config.PlacementAddresses, a.certChain,
			a.config.AppID, hostname, a.config.HostedActorTypes,
			appHealthFn,
			afterTableUpdateFn)
	}

	go a.placement.Start()
	go a.deactivationTicker(a.config, a.deactivateActor)

	log.Infof("actor runtime started. actor idle timeout: %s. actor scan interval: %s",
		a.config.ActorIdleTimeout.String(), a.config.ActorDeactivationScanInterval.String())

	// Be careful to configure healthz endpoint option. If app healthz returns unhealthy status, Dapr will
	// disconnect from placement to remove the node from consistent hashing ring.
	// i.e if app is busy state, the healthz status would be flaky, which leads to frequent
	// actor rebalancing. It will impact the entire service.
	go a.startAppHealthCheck(
		health.WithFailureThreshold(4),
		health.WithInterval(5*time.Second),
		health.WithRequestTimeout(2*time.Second))

	return nil
}

func (a *actorsRuntime) startAppHealthCheck(opts ...health.Option) {
	if len(a.config.HostedActorTypes) == 0 || a.appChannel == nil {
		return
	}

	ch := health.StartEndpointHealthCheck(a.ctx, a.appChannel.GetBaseAddress()+"/healthz", opts...)
	for {
		select {
		case <-a.ctx.Done():
			break
		case appHealthy := <-ch:
			a.appHealthy.Store(appHealthy)
		}
	}
}

func constructCompositeKey(keys ...string) string {
	return strings.Join(keys, daprSeparator)
}

func (a *actorsRuntime) deactivateActor(actorType, actorID string) error {
	req := invokev1.NewInvokeMethodRequest("actors/"+actorType+"/"+actorID).
		WithActor(actorType, actorID).
		WithHTTPExtension(nethttp.MethodDelete, "").
		WithContentType(invokev1.JSONContentType)
	defer req.Close()

	// TODO Propagate context.
	ctx := context.TODO()

	resp, err := a.getAppChannel(actorType).InvokeMethod(ctx, req)
	if err != nil {
		diag.DefaultMonitoring.ActorDeactivationFailed(actorType, "invoke")
		return err
	}
	defer resp.Close()

	if resp.Status().Code != nethttp.StatusOK {
		diag.DefaultMonitoring.ActorDeactivationFailed(actorType, "status_code_"+strconv.FormatInt(int64(resp.Status().Code), 10))
		body, _ := resp.RawDataFull()
		return fmt.Errorf("error from actor service: %s", string(body))
	}

	a.removeActorFromTable(actorType, actorID)
	diag.DefaultMonitoring.ActorDeactivated(actorType)
	log.Debugf("deactivated actor type=%s, id=%s\n", actorType, actorID)

	return nil
}

func (a *actorsRuntime) removeActorFromTable(actorType, actorID string) {
	a.actorsTable.Delete(constructCompositeKey(actorType, actorID))
}

func (a *actorsRuntime) getActorTypeAndIDFromKey(key string) (string, string) {
	arr := strings.Split(key, daprSeparator)
	return arr[0], arr[1]
}

type deactivateFn = func(actorType string, actorID string) error

func (a *actorsRuntime) deactivationTicker(configuration Config, deactivateFn deactivateFn) {
	ticker := a.clock.Ticker(configuration.ActorDeactivationScanInterval)
	defer ticker.Stop()

	for {
		select {
		case t := <-ticker.C:
			a.actorsTable.Range(func(key, value interface{}) bool {
				actorInstance := value.(*actor)

				if actorInstance.isBusy() {
					return true
				}

				durationPassed := t.Sub(actorInstance.lastUsedTime)
				if durationPassed >= configuration.GetIdleTimeoutForType(actorInstance.actorType) {
					go func(actorKey string) {
						actorType, actorID := a.getActorTypeAndIDFromKey(actorKey)
						err := deactivateFn(actorType, actorID)
						if err != nil {
							log.Errorf("failed to deactivate actor %s: %s", actorKey, err)
						}
					}(key.(string))
				}

				return true
			})
		case <-a.ctx.Done():
			return
		}
	}
}

type lookupActorRes struct {
	targetActorAddress string
	appID              string
}

func (a *actorsRuntime) Call(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	err := a.placement.WaitUntilPlacementTableIsReady(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to wait for placement table readiness: %w", err)
	}

	actor := req.Actor()
	// Retry here to allow placement table dissemination/rebalancing to happen.
	policyDef := a.resiliency.BuiltInPolicy(resiliency.BuiltInActorNotFoundRetries)
	policyRunner := resiliency.NewRunner[*lookupActorRes](ctx, policyDef)
	lar, err := policyRunner(func(ctx context.Context) (*lookupActorRes, error) {
		rAddr, rAppID := a.placement.LookupActor(actor.GetActorType(), actor.GetActorId())
		if rAddr == "" {
			return nil, fmt.Errorf("error finding address for actor type %s with id %s", actor.GetActorType(), actor.GetActorId())
		}
		return &lookupActorRes{
			targetActorAddress: rAddr,
			appID:              rAppID,
		}, nil
	})
	if err != nil {
		return nil, err
	}
	if lar == nil {
		lar = &lookupActorRes{}
	}

	var resp *invokev1.InvokeMethodResponse
	if a.isActorLocal(lar.targetActorAddress, a.config.HostAddress, a.config.Port) {
		resp, err = a.callLocalActor(ctx, req)
	} else {
		resp, err = a.callRemoteActorWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, a.callRemoteActor, lar.targetActorAddress, lar.appID, req)
	}

	if err != nil {
		if errors.Is(err, ErrDaprResponseHeader) {
			// We return the response to maintain the .NET Actor contract which communicates errors via the body, but resiliency needs the error to retry.
			return resp, err
		}
		if resp != nil {
			resp.Close()
		}
		return nil, err
	}
	return resp, nil
}

// callRemoteActorWithRetry will call a remote actor for the specified number of retries and will only retry in the case of transient failures.
func (a *actorsRuntime) callRemoteActorWithRetry(
	ctx context.Context,
	numRetries int,
	backoffInterval time.Duration,
	fn func(ctx context.Context, targetAddress, targetID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, func(destroy bool), error),
	targetAddress, targetID string, req *invokev1.InvokeMethodRequest,
) (*invokev1.InvokeMethodResponse, error) {
	if !a.resiliency.PolicyDefined(req.Actor().ActorType, resiliency.ActorPolicy{}) {
		// This policy has built-in retries so enable replay in the request
		req.WithReplay(true)
		policyRunner := resiliency.NewRunnerWithOptions(ctx,
			a.resiliency.BuiltInPolicy(resiliency.BuiltInActorRetries),
			resiliency.RunnerOpts[*invokev1.InvokeMethodResponse]{
				Disposer: resiliency.DisposerCloser[*invokev1.InvokeMethodResponse],
			},
		)
		attempts := atomic.Int32{}
		return policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) {
			attempt := attempts.Add(1)
			rResp, teardown, rErr := fn(ctx, targetAddress, targetID, req)
			if rErr == nil {
				teardown(false)
				return rResp, nil
			}

			code := status.Code(rErr)
			if code == codes.Unavailable || code == codes.Unauthenticated {
				// Destroy the connection and force a re-connection on the next attempt
				teardown(true)
				return rResp, fmt.Errorf("failed to invoke target %s after %d retries. Error: %w", targetAddress, attempt-1, rErr)
			}

			teardown(false)
			return rResp, backoff.Permanent(rErr)
		})
	}

	resp, teardown, err := fn(ctx, targetAddress, targetID, req)
	teardown(false)
	return resp, err
}

func (a *actorsRuntime) getOrCreateActor(actorType, actorID string) *actor {
	key := constructCompositeKey(actorType, actorID)

	// This avoids allocating multiple actor allocations by calling newActor
	// whenever actor is invoked. When storing actor key first, there is a chance to
	// call newActor, but this is trivial.
	val, ok := a.actorsTable.Load(key)
	if !ok {
		val, _ = a.actorsTable.LoadOrStore(key, newActor(actorType, actorID, a.config.GetReentrancyForType(actorType).MaxStackDepth, a.clock))
	}

	return val.(*actor)
}

func (a *actorsRuntime) callLocalActor(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	actorTypeID := req.Actor()

	act := a.getOrCreateActor(actorTypeID.GetActorType(), actorTypeID.GetActorId())

	// Reentrancy to determine how we lock.
	var reentrancyID *string
	if a.config.GetReentrancyForType(act.actorType).Enabled {
		if headerValue, ok := req.Metadata()["Dapr-Reentrancy-Id"]; ok {
			reentrancyID = &headerValue.GetValues()[0]
		} else {
			reentrancyHeader := fasthttp.RequestHeader{}
			uuidObj, err := uuid.NewRandom()
			if err != nil {
				return nil, fmt.Errorf("failed to generate UUID: %w", err)
			}
			uuid := uuidObj.String()
			reentrancyHeader.Add("Dapr-Reentrancy-Id", uuid)
			req.AddHeaders(&reentrancyHeader)
			reentrancyID = &uuid
		}
	}

	err := act.lock(reentrancyID)
	if err != nil {
		return nil, status.Error(codes.ResourceExhausted, err.Error())
	}
	defer act.unlock()

	// Replace method to actors method.
	msg := req.Message()
	originalMethod := msg.Method
	msg.Method = "actors/" + actorTypeID.ActorType + "/" + actorTypeID.ActorId + "/method/" + msg.Method

	// Reset the method so we can perform retries.
	defer func() {
		msg.Method = originalMethod
	}()

	// Original code overrides method with PUT. Why?
	if msg.GetHttpExtension() == nil {
		req.WithHTTPExtension(nethttp.MethodPut, "")
	} else {
		msg.HttpExtension.Verb = commonv1pb.HTTPExtension_PUT //nolint:nosnakecase
	}

	policyDef := a.resiliency.ActorPostLockPolicy(act.actorType, act.actorID)

	// If the request can be retried, we need to enable replaying
	if policyDef != nil && policyDef.HasRetries() {
		req.WithReplay(true)
	}

	policyRunner := resiliency.NewRunnerWithOptions(ctx, policyDef,
		resiliency.RunnerOpts[*invokev1.InvokeMethodResponse]{
			Disposer: resiliency.DisposerCloser[*invokev1.InvokeMethodResponse],
		},
	)
	resp, err := policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) {
		return a.getAppChannel(act.actorType).InvokeMethod(ctx, req)
	})
	if err != nil {
		return nil, err
	}

	if resp == nil {
		return nil, errors.New("error from actor service: response object is nil")
	}

	if resp.Status().Code != nethttp.StatusOK {
		respData, _ := resp.RawDataFull()
		return nil, fmt.Errorf("error from actor service: %s", string(respData))
	}

	// The .NET SDK signifies Actor failure via a header instead of a bad response.
	if _, ok := resp.Headers()["X-Daprerrorresponseheader"]; ok {
		return resp, ErrDaprResponseHeader
	}

	return resp, nil
}

func (a *actorsRuntime) getAppChannel(actorType string) channel.AppChannel {
	if a.internalActorChannel.Contains(actorType) {
		return a.internalActorChannel
	}
	return a.appChannel
}

func (a *actorsRuntime) callRemoteActor(
	ctx context.Context,
	targetAddress, targetID string,
	req *invokev1.InvokeMethodRequest,
) (*invokev1.InvokeMethodResponse, func(destroy bool), error) {
	conn, teardown, err := a.grpcConnectionFn(context.TODO(), targetAddress, targetID, a.config.Namespace)
	if err != nil {
		return nil, teardown, err
	}

	span := diagUtils.SpanFromContext(ctx)
	ctx = diag.SpanContextToGRPCMetadata(ctx, span.SpanContext())
	client := internalv1pb.NewServiceInvocationClient(conn)

	pd, err := req.ProtoWithData()
	if err != nil {
		return nil, teardown, fmt.Errorf("failed to read data from request object: %w", err)
	}
	resp, err := client.CallActor(ctx, pd)
	if err != nil {
		return nil, teardown, err
	}

	invokeResponse, invokeErr := invokev1.InternalInvokeResponse(resp)
	if invokeErr != nil {
		return nil, teardown, invokeErr
	}

	// Generated gRPC client eats the response when we send
	if _, ok := invokeResponse.Headers()["X-Daprerrorresponseheader"]; ok {
		return invokeResponse, teardown, ErrDaprResponseHeader
	}

	return invokeResponse, teardown, nil
}

func (a *actorsRuntime) isActorLocal(targetActorAddress, hostAddress string, grpcPort int) bool {
	return strings.Contains(targetActorAddress, "localhost") || strings.Contains(targetActorAddress, "127.0.0.1") ||
		targetActorAddress == hostAddress+":"+strconv.Itoa(grpcPort)
}

func (a *actorsRuntime) GetState(ctx context.Context, req *GetStateRequest) (*StateResponse, error) {
	if a.store == nil {
		return nil, errors.New("actors: state store does not exist or incorrectly configured")
	}

	partitionKey := constructCompositeKey(a.config.AppID, req.ActorType, req.ActorID)
	metadata := map[string]string{metadataPartitionKey: partitionKey}

	key := a.constructActorStateKey(req.ActorType, req.ActorID, req.Key)

	policyRunner := resiliency.NewRunner[*state.GetResponse](ctx,
		a.resiliency.ComponentOutboundPolicy(a.storeName, resiliency.Statestore),
	)
	storeReq := &state.GetRequest{
		Key:      key,
		Metadata: metadata,
	}
	resp, err := policyRunner(func(ctx context.Context) (*state.GetResponse, error) {
		return a.store.Get(ctx, storeReq)
	})
	if err != nil {
		return nil, err
	}

	if resp == nil {
		return &StateResponse{}, nil
	}

	return &StateResponse{
		Data: resp.Data,
	}, nil
}

func (a *actorsRuntime) TransactionalStateOperation(ctx context.Context, req *TransactionalRequest) error {
	if a.store == nil || a.transactionalStore == nil {
		return errors.New("actors: state store does not exist or incorrectly configured. Have you set the - name: actorStateStore value: \"true\" in your state store component file?")
	}
	operations := make([]state.TransactionalStateOperation, len(req.Operations))
	partitionKey := constructCompositeKey(a.config.AppID, req.ActorType, req.ActorID)
	metadata := map[string]string{metadataPartitionKey: partitionKey}
	for i, o := range req.Operations {
		switch o.Operation {
		case Upsert:
			var upsert TransactionalUpsert
			err := mapstructure.Decode(o.Request, &upsert)
			if err != nil {
				return err
			}
			key := a.constructActorStateKey(req.ActorType, req.ActorID, upsert.Key)
			operations[i] = state.TransactionalStateOperation{
				Request: state.SetRequest{
					Key:      key,
					Value:    upsert.Value,
					Metadata: metadata,
				},
				Operation: state.Upsert,
			}
		case Delete:
			var delete TransactionalDelete
			err := mapstructure.Decode(o.Request, &delete)
			if err != nil {
				return err
			}
			key := a.constructActorStateKey(req.ActorType, req.ActorID, delete.Key)
			operations[i] = state.TransactionalStateOperation{
				Request: state.DeleteRequest{
					Key:      key,
					Metadata: metadata,
				},
				Operation: state.Delete,
			}
		default:
			return fmt.Errorf("operation type %s not supported", o.Operation)
		}
	}

	policyRunner := resiliency.NewRunner[any](ctx,
		a.resiliency.ComponentOutboundPolicy(a.storeName, resiliency.Statestore),
	)
	stateReq := &state.TransactionalStateRequest{
		Operations: operations,
		Metadata:   metadata,
	}
	_, err := policyRunner(func(ctx context.Context) (any, error) {
		return nil, a.transactionalStore.Multi(ctx, stateReq)
	})
	return err
}

func (a *actorsRuntime) IsActorHosted(ctx context.Context, req *ActorHostedRequest) bool {
	key := constructCompositeKey(req.ActorType, req.ActorID)
	policyDef := a.resiliency.BuiltInPolicy(resiliency.BuiltInActorNotFoundRetries)
	policyRunner := resiliency.NewRunner[any](ctx, policyDef)
	_, err := policyRunner(func(ctx context.Context) (any, error) {
		_, exists := a.actorsTable.Load(key)
		if !exists {
			// Error message isn't used - we just need to have an error
			return nil, errors.New("")
		}
		return nil, nil
	})
	return err == nil
}

func (a *actorsRuntime) constructActorStateKey(actorType, actorID, key string) string {
	return constructCompositeKey(a.config.AppID, actorType, actorID, key)
}

func (a *actorsRuntime) drainRebalancedActors() {
	// visit all currently active actors.
	var wg sync.WaitGroup

	a.actorsTable.Range(func(key interface{}, value interface{}) bool {
		wg.Add(1)
		go func(key interface{}, value interface{}, wg *sync.WaitGroup) {
			defer wg.Done()
			// for each actor, deactivate if no longer hosted locally
			actorKey := key.(string)
			actorType, actorID := a.getActorTypeAndIDFromKey(actorKey)
			address, _ := a.placement.LookupActor(actorType, actorID)
			if address != "" && !a.isActorLocal(address, a.config.HostAddress, a.config.Port) {
				// actor has been moved to a different host, deactivate when calls are done cancel any reminders
				// each item in reminders contain a struct with some metadata + the actual reminder struct
				a.remindersLock.RLock()
				reminders := a.reminders[actorType]
				a.remindersLock.RUnlock()
				for _, r := range reminders {
					// r.reminder refers to the actual reminder struct that is saved in the db
					if r.reminder.ActorType == actorType && r.reminder.ActorID == actorID {
						reminderKey := constructCompositeKey(actorKey, r.reminder.Name)
						stopChan, exists := a.activeReminders.Load(reminderKey)
						if exists {
							close(stopChan.(chan bool))
							a.activeReminders.Delete(reminderKey)
						}
					}
				}

				actor := value.(*actor)
				if a.config.GetDrainRebalancedActorsForType(actorType) {
					// wait until actor isn't busy or timeout hits
					if actor.isBusy() {
						select {
						case <-a.clock.After(a.config.DrainOngoingCallTimeout):
							break
						case <-actor.channel():
							// if a call comes in from the actor for state changes, that's still allowed
							break
						}
					}
				}

				// don't allow state changes
				a.actorsTable.Delete(key)

				diag.DefaultMonitoring.ActorRebalanced(actorType)

				for {
					// wait until actor is not busy, then deactivate
					if !actor.isBusy() {
						err := a.deactivateActor(actorType, actorID)
						if err != nil {
							log.Errorf("failed to deactivate actor %s: %s", actorKey, err)
						}
						break
					}
					a.clock.Sleep(time.Millisecond * 500)
				}
			}
		}(key, value, &wg)
		return true
	})

	wg.Wait()
}

func (a *actorsRuntime) evaluateReminders() {
	a.evaluationLock.Lock()
	defer a.evaluationLock.Unlock()

	a.evaluationChan <- struct{}{}

	var wg sync.WaitGroup
	for _, t := range a.config.HostedActorTypes {
		vals, _, err := a.getRemindersForActorType(t, true)
		if err != nil {
			log.Errorf("error getting reminders for actor type %s: %s", t, err)
		} else {
			log.Debugf("loaded %d reminders for actor type %s", len(vals), t)
			a.remindersLock.Lock()
			a.reminders[t] = vals
			a.remindersLock.Unlock()

			wg.Add(1)
			go func(wg *sync.WaitGroup, reminders []actorReminderReference) {
				defer wg.Done()

				for i := range reminders {
					r := reminders[i] // Make a copy since we will refer to this as a reference in this loop.
					targetActorAddress, _ := a.placement.LookupActor(r.reminder.ActorType, r.reminder.ActorID)
					if targetActorAddress == "" {
						log.Warnf("did not find address for actor ID %s and actor type %s in reminder %s",
							r.reminder.ActorID,
							r.reminder.ActorType,
							r.reminder.Name)
						continue
					}

					actorKey := constructCompositeKey(r.reminder.ActorType, r.reminder.ActorID)
					reminderKey := constructCompositeKey(actorKey, r.reminder.Name)
					if a.isActorLocal(targetActorAddress, a.config.HostAddress, a.config.Port) {
						_, exists := a.activeReminders.Load(reminderKey)

						if !exists {
							stop := make(chan bool)
							a.activeReminders.Store(reminderKey, stop)
							err := a.startReminder(&r.reminder, stop)
							if err != nil {
								log.Errorf("error starting reminder: %s", err)
							} else {
								log.Debugf("started reminder %s for actor ID %s and actor type %s",
									r.reminder.Name,
									r.reminder.ActorID,
									r.reminder.ActorType)
							}
						} else {
							log.Debugf("reminder %s already exists for actor ID %s and actor type %s",
								r.reminder.Name,
								r.reminder.ActorID,
								r.reminder.ActorType)
						}
					} else {
						stopChan, exists := a.activeReminders.Load(reminderKey)
						if exists {
							log.Debugf("stopping reminder %s on %s as it's active on host %s", reminderKey, a.config.HostAddress, targetActorAddress)
							close(stopChan.(chan bool))
							a.activeReminders.Delete(reminderKey)
						}
					}
				}
			}(&wg, vals)
		}
	}
	wg.Wait()
	<-a.evaluationChan
}

func (a *actorsRuntime) getReminderTrack(actorKey, name string) (*ReminderTrack, error) {
	if a.store == nil {
		return nil, errors.New("actors: state store does not exist or incorrectly configured")
	}

	policyRunner := resiliency.NewRunner[*state.GetResponse](context.TODO(),
		a.resiliency.ComponentOutboundPolicy(a.storeName, resiliency.Statestore),
	)
	storeReq := &state.GetRequest{
		Key: constructCompositeKey(actorKey, name),
	}
	resp, err := policyRunner(func(ctx context.Context) (*state.GetResponse, error) {
		return a.store.Get(ctx, storeReq)
	})
	if err != nil {
		return nil, err
	}

	if resp == nil {
		resp = &state.GetResponse{}
	}
	track := &ReminderTrack{
		RepetitionLeft: -1,
	}
	_ = json.Unmarshal(resp.Data, track)
	track.Etag = resp.ETag
	return track, nil
}

func (a *actorsRuntime) updateReminderTrack(actorKey, name string, repetition int, lastInvokeTime time.Time, etag *string) error {
	if a.store == nil {
		return errors.New("actors: state store does not exist or incorrectly configured")
	}

	track := ReminderTrack{
		LastFiredTime:  lastInvokeTime.Format(time.RFC3339),
		RepetitionLeft: repetition,
	}

	policyRunner := resiliency.NewRunner[any](context.TODO(),
		a.resiliency.ComponentOutboundPolicy(a.storeName, resiliency.Statestore),
	)
	setReq := &state.SetRequest{
		Key:   constructCompositeKey(actorKey, name),
		Value: track,
		ETag:  etag,
		Options: state.SetStateOption{
			Concurrency: state.FirstWrite,
		},
	}
	_, err := policyRunner(func(ctx context.Context) (any, error) {
		return nil, a.store.Set(ctx, setReq)
	})
	return err
}

func (a *actorsRuntime) startReminder(reminder *Reminder, stopChannel chan bool) error {
	actorKey := constructCompositeKey(reminder.ActorType, reminder.ActorID)
	reminderKey := constructCompositeKey(actorKey, reminder.Name)

	var (
		nextTime, ttl            time.Time
		period                   time.Duration
		years, months, days      int
		repeats, repetitionsLeft int
		eTag                     *string
	)

	registeredTime, err := time.Parse(time.RFC3339, reminder.RegisteredTime)
	if err != nil {
		return fmt.Errorf("error parsing reminder registered time: %w", err)
	}
	if len(reminder.ExpirationTime) != 0 {
		if ttl, err = time.Parse(time.RFC3339, reminder.ExpirationTime); err != nil {
			return fmt.Errorf("error parsing reminder expiration time: %w", err)
		}
	}

	repeats = -1 // set to default
	if len(reminder.Period) != 0 {
		if years, months, days, period, repeats, err = timeutils.ParseDuration(reminder.Period); err != nil {
			return fmt.Errorf("error parsing reminder period: %w", err)
		}
	}

	track, err := a.getReminderTrack(actorKey, reminder.Name)
	if err != nil {
		return fmt.Errorf("error getting reminder track: %w", err)
	}

	if track != nil && len(track.LastFiredTime) != 0 {
		lastFiredTime, err := time.Parse(time.RFC3339, track.LastFiredTime)
		if err != nil {
			return fmt.Errorf("error parsing reminder last fired time: %w", err)
		}

		repetitionsLeft = track.RepetitionLeft
		nextTime = lastFiredTime.AddDate(years, months, days).Add(period)
	} else {
		repetitionsLeft = repeats
		nextTime = registeredTime
	}
	eTag = track.Etag

	go func(reminder *Reminder, years int, months int, days int, period time.Duration, nextTime, ttl time.Time, repetitionsLeft int, eTag *string, stop chan bool) {
		var (
			ttlTimer, nextTimer *clocklib.Timer
			ttlTimerC           <-chan time.Time
			err                 error
		)
		if !ttl.IsZero() {
			ttlTimer = a.clock.Timer(a.clock.Until(ttl))
			ttlTimerC = ttlTimer.C
		}
		nextTimer = a.clock.Timer(a.clock.Until(nextTime))
		defer func() {
			if nextTimer.Stop() {
				<-nextTimer.C
			}
			if ttlTimer != nil && ttlTimer.Stop() {
				<-ttlTimerC
			}
		}()
	L:
		for {
			select {
			case <-nextTimer.C:
				// noop
			case <-ttlTimerC:
				// proceed with reminder deletion
				log.Infof("reminder %s has expired", reminder.Name)
				break L
			case <-stop:
				// reminder has been already deleted
				log.Infof("reminder %s with parameters: dueTime: %s, period: %s, data: %v has been deleted.", reminder.Name, reminder.RegisteredTime, reminder.Period, reminder.Data)
				return
			}

			_, exists := a.activeReminders.Load(reminderKey)
			if !exists {
				log.Errorf("could not find active reminder with key: %s", reminderKey)
				return
			}
			// if all repetitions are completed, proceed with reminder deletion
			if repetitionsLeft == 0 {
				log.Infof("reminder %q has completed %d repetitions", reminder.Name, repeats)
				break L
			}
			if err = a.executeReminder(reminder); err != nil {
				if errors.Is(err, ErrReminderCanceled) {
					// The handler is explicitly canceling the timer
					log.Infof("reminder %q was canceled by the actor", reminder.Name)
					break L
				} else {
					log.Errorf("error execution of reminder %q for actor type %s with id %s: %v",
						reminder.Name, reminder.ActorType, reminder.ActorID, err)
				}
			}
			if repetitionsLeft > 0 {
				repetitionsLeft--
			}

			_, exists = a.activeReminders.Load(reminderKey)
			if exists {
				if err = a.updateReminderTrack(actorKey, reminder.Name, repetitionsLeft, nextTime, eTag); err != nil {
					log.Errorf("error updating reminder track: %v", err)
				}
				track, gErr := a.getReminderTrack(actorKey, reminder.Name)
				if gErr != nil {
					log.Errorf("error retrieving reminder: %v", gErr)
				} else {
					eTag = track.Etag
				}
			} else {
				log.Errorf("could not find active reminder with key: %s", reminderKey)
				return
			}

			// if reminder is not repetitive, proceed with reminder deletion
			if years == 0 && months == 0 && days == 0 && period == 0 {
				break L
			}
			nextTime = nextTime.AddDate(years, months, days).Add(period)
			if nextTimer.Stop() {
				<-nextTimer.C
			}
			nextTimer.Reset(a.clock.Until(nextTime))
		}
		err = a.DeleteReminder(context.TODO(), &DeleteReminderRequest{
			Name:      reminder.Name,
			ActorID:   reminder.ActorID,
			ActorType: reminder.ActorType,
		})
		if err != nil {
			log.Errorf("error deleting reminder: %s", err)
		}
	}(reminder, years, months, days, period, nextTime, ttl, repetitionsLeft, eTag, stopChannel)

	return nil
}

func (a *actorsRuntime) executeReminder(reminder *Reminder) error {
	r := ReminderResponse{
		DueTime: reminder.DueTime,
		Period:  reminder.Period,
		Data:    reminder.Data,
	}
	b, err := json.Marshal(&r)
	if err != nil {
		return err
	}

	policyDef := a.resiliency.ActorPreLockPolicy(reminder.ActorType, reminder.ActorID)

	log.Debugf("executing reminder %s for actor type %s with id %s", reminder.Name, reminder.ActorType, reminder.ActorID)
	req := invokev1.NewInvokeMethodRequest("remind/"+reminder.Name).
		WithActor(reminder.ActorType, reminder.ActorID).
		WithRawDataBytes(b).
		WithContentType(invokev1.JSONContentType)
	if policyDef != nil {
		req.WithReplay(policyDef.HasRetries())
	}
	defer req.Close()

	policyRunner := resiliency.NewRunnerWithOptions(context.TODO(), policyDef,
		resiliency.RunnerOpts[*invokev1.InvokeMethodResponse]{
			Disposer: resiliency.DisposerCloser[*invokev1.InvokeMethodResponse],
		},
	)
	imr, err := policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) {
		return a.callLocalActor(ctx, req)
	})
	if err != nil && !errors.Is(err, ErrReminderCanceled) {
		log.Errorf("error executing reminder %s for actor type %s with id %s: %v", reminder.Name, reminder.ActorType, reminder.ActorID, err)
	}
	if imr != nil {
		_ = imr.Close()
	}
	return err
}

func (a *actorsRuntime) reminderRequiresUpdate(req *CreateReminderRequest, reminder *Reminder) bool {
	return reminder.ActorID == req.ActorID &&
		reminder.ActorType == req.ActorType &&
		reminder.Name == req.Name &&
		(!reflect.DeepEqual(reminder.Data, req.Data) ||
			reminder.DueTime != req.DueTime ||
			reminder.Period != req.Period ||
			len(req.TTL) != 0 ||
			(len(reminder.ExpirationTime) != 0 && len(req.TTL) == 0))
}

func (a *actorsRuntime) getReminder(reminderName string, actorType string, actorID string) (*Reminder, bool) {
	a.remindersLock.RLock()
	reminders := a.reminders[actorType]
	a.remindersLock.RUnlock()

	for _, r := range reminders {
		if r.reminder.ActorID == actorID && r.reminder.ActorType == actorType && r.reminder.Name == reminderName {
			return &r.reminder, true
		}
	}

	return nil, false
}

func (m *ActorMetadata) calculateReminderPartition(actorID, reminderName string) uint32 {
	if m.RemindersMetadata.PartitionCount <= 0 {
		return 0
	}

	// do not change this hash function because it would be a breaking change.
	h := fnv.New32a()
	h.Write([]byte(actorID))
	h.Write([]byte(reminderName))
	return (h.Sum32() % uint32(m.RemindersMetadata.PartitionCount)) + 1
}

func (m *ActorMetadata) createReminderReference(reminder Reminder) actorReminderReference {
	if m.RemindersMetadata.PartitionCount > 0 {
		return actorReminderReference{
			actorMetadataID:           m.ID,
			actorRemindersPartitionID: m.calculateReminderPartition(reminder.ActorID, reminder.Name),
			reminder:                  reminder,
		}
	}

	return actorReminderReference{
		actorMetadataID:           metadataZeroID,
		actorRemindersPartitionID: 0,
		reminder:                  reminder,
	}
}

func (m *ActorMetadata) calculateRemindersStateKey(actorType string, remindersPartitionID uint32) string {
	if remindersPartitionID == 0 {
		return constructCompositeKey("actors", actorType)
	}

	return constructCompositeKey(
		"actors",
		actorType,
		m.ID,
		"reminders",
		strconv.Itoa(int(remindersPartitionID)))
}

func (m *ActorMetadata) calculateEtag(partitionID uint32) *string {
	return m.RemindersMetadata.partitionsEtag[partitionID]
}

func (m *ActorMetadata) removeReminderFromPartition(reminderRefs []actorReminderReference, actorType, actorID, reminderName string) ([]Reminder, string, *string) {
	// First, we find the partition
	var partitionID uint32
	if m.RemindersMetadata.PartitionCount > 0 {
		for _, reminderRef := range reminderRefs {
			if reminderRef.reminder.ActorType == actorType && reminderRef.reminder.ActorID == actorID && reminderRef.reminder.Name == reminderName {
				partitionID = reminderRef.actorRemindersPartitionID
			}
		}
	}

	var remindersInPartitionAfterRemoval []Reminder
	for _, reminderRef := range reminderRefs {
		if reminderRef.reminder.ActorType == actorType && reminderRef.reminder.ActorID == actorID && reminderRef.reminder.Name == reminderName {
			continue
		}

		// Only the items in the partition to be updated.
		if reminderRef.actorRemindersPartitionID == partitionID {
			remindersInPartitionAfterRemoval = append(remindersInPartitionAfterRemoval, reminderRef.reminder)
		}
	}

	stateKey := m.calculateRemindersStateKey(actorType, partitionID)
	return remindersInPartitionAfterRemoval, stateKey, m.calculateEtag(partitionID)
}

func (m *ActorMetadata) insertReminderInPartition(reminderRefs []actorReminderReference, reminder Reminder) ([]Reminder, actorReminderReference, string, *string) {
	newReminderRef := m.createReminderReference(reminder)

	var remindersInPartitionAfterInsertion []Reminder
	for _, reminderRef := range reminderRefs {
		// Only the items in the partition to be updated.
		if reminderRef.actorRemindersPartitionID == newReminderRef.actorRemindersPartitionID {
			remindersInPartitionAfterInsertion = append(remindersInPartitionAfterInsertion, reminderRef.reminder)
		}
	}

	remindersInPartitionAfterInsertion = append(remindersInPartitionAfterInsertion, reminder)

	stateKey := m.calculateRemindersStateKey(newReminderRef.reminder.ActorType, newReminderRef.actorRemindersPartitionID)
	return remindersInPartitionAfterInsertion, newReminderRef, stateKey, m.calculateEtag(newReminderRef.actorRemindersPartitionID)
}

func (m *ActorMetadata) calculateDatabasePartitionKey(stateKey string) string {
	if m.RemindersMetadata.PartitionCount > 0 {
		return m.ID
	}

	return stateKey
}

func (a *actorsRuntime) waitForEvaluationChan() bool {
	t := a.clock.Timer(5 * time.Second)
	defer t.Stop()
	select {
	case <-a.ctx.Done():
		return false
	case <-t.C:
		return false
	case a.evaluationChan <- struct{}{}:
		<-a.evaluationChan
	}
	return true
}

func (a *actorsRuntime) CreateReminder(ctx context.Context, req *CreateReminderRequest) error {
	if a.store == nil {
		return errors.New("actors: state store does not exist or incorrectly configured")
	}

	a.activeRemindersLock.Lock()
	defer a.activeRemindersLock.Unlock()
	if r, exists := a.getReminder(req.Name, req.ActorType, req.ActorID); exists {
		if a.reminderRequiresUpdate(req, r) {
			err := a.doDeleteReminder(ctx, &DeleteReminderRequest{
				ActorID:   req.ActorID,
				ActorType: req.ActorType,
				Name:      req.Name,
			})
			if err != nil {
				return err
			}
		} else {
			return nil
		}
	}

	if !a.waitForEvaluationChan() {
		return errors.New("error creating reminder: timed out after 5s")
	}

	now := a.clock.Now()
	reminder := Reminder{
		ActorID:   req.ActorID,
		ActorType: req.ActorType,
		Name:      req.Name,
		Data:      req.Data,
		Period:    req.Period,
		DueTime:   req.DueTime,
	}

	// check input correctness
	var (
		dueTime, ttl time.Time
		repeats      int
		err          error
	)
	if len(req.DueTime) != 0 {
		if dueTime, err = timeutils.ParseTime(req.DueTime, &now); err != nil {
			return fmt.Errorf("error parsing reminder due time: %w", err)
		}
	} else {
		dueTime = now
	}
	reminder.RegisteredTime = dueTime.Format(time.RFC3339)

	if len(req.Period) != 0 {
		_, _, _, _, repeats, err = timeutils.ParseDuration(req.Period)
		if err != nil {
			return fmt.Errorf("error parsing reminder period: %w", err)
		}
		// error on timers with zero repetitions
		if repeats == 0 {
			return fmt.Errorf("reminder %s has zero repetitions", reminder.Name)
		}
	}
	// set expiration time if configured
	if len(req.TTL) > 0 {
		if ttl, err = timeutils.ParseTime(req.TTL, &dueTime); err != nil {
			return fmt.Errorf("error parsing reminder TTL: %w", err)
		}
		// check if already expired
		if now.After(ttl) || dueTime.After(ttl) {
			return fmt.Errorf("reminder %s has already expired: registeredTime: %s TTL:%s",
				reminder.Name, reminder.RegisteredTime, req.TTL)
		}
		reminder.ExpirationTime = ttl.UTC().Format(time.RFC3339)
	}

	stop := make(chan bool)

	err = a.storeReminder(ctx, reminder, stop)
	if err != nil {
		return err
	}
	return a.startReminder(&reminder, stop)
}

func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest) error {
	var (
		err                 error
		repeats             int
		dueTime, ttl        time.Time
		period              time.Duration
		years, months, days int
	)
	a.activeTimersLock.Lock()
	defer a.activeTimersLock.Unlock()
	actorKey := constructCompositeKey(req.ActorType, req.ActorID)
	timerKey := constructCompositeKey(actorKey, req.Name)

	_, exists := a.actorsTable.Load(actorKey)
	if !exists {
		return fmt.Errorf("can't create timer for actor %s: actor not activated", actorKey)
	}

	stopChan, exists := a.activeTimers.Load(timerKey)
	if exists {
		close(stopChan.(chan bool))
	}

	now := a.clock.Now()
	if len(req.DueTime) != 0 {
		if dueTime, err = timeutils.ParseTime(req.DueTime, &now); err != nil {
			return fmt.Errorf("error parsing timer due time: %w", err)
		}
	} else {
		dueTime = now
	}

	repeats = -1 // set to default
	if len(req.Period) != 0 {
		if years, months, days, period, repeats, err = timeutils.ParseDuration(req.Period); err != nil {
			return fmt.Errorf("error parsing timer period: %w", err)
		}
		// error on timers with zero repetitions
		if repeats == 0 {
			return fmt.Errorf("timer %s has zero repetitions", timerKey)
		}
	}

	if len(req.TTL) > 0 {
		if ttl, err = timeutils.ParseTime(req.TTL, &dueTime); err != nil {
			return fmt.Errorf("error parsing timer TTL: %w", err)
		}
		if now.After(ttl) || dueTime.After(ttl) {
			return fmt.Errorf("timer %s has already expired: dueTime: %s TTL: %s", timerKey, req.DueTime, req.TTL)
		}
	}

	log.Debugf("create timer %q dueTime:%s period:%s repeats:%d ttl:%s",
		req.Name, dueTime.String(), period.String(), repeats, ttl.String())
	stop := make(chan bool, 1)
	a.activeTimers.Store(timerKey, stop)

	go func(stop chan bool, req *CreateTimerRequest) {
		var (
			ttlTimer, nextTimer *clocklib.Timer
			ttlTimerC           <-chan time.Time
			err                 error
		)
		if !ttl.IsZero() {
			ttlTimer = a.clock.Timer(a.clock.Until(ttl))
			ttlTimerC = ttlTimer.C
		}
		nextTime := dueTime
		nextTimer = a.clock.Timer(a.clock.Until(nextTime))
		defer func() {
			if nextTimer.Stop() {
				<-nextTimer.C
			}
			if ttlTimer != nil && ttlTimer.Stop() {
				<-ttlTimerC
			}
		}()
	L:
		for {
			select {
			case <-nextTimer.C:
				// noop
			case <-ttlTimerC:
				// timer has expired; proceed with deletion
				log.Infof("timer %s with parameters: dueTime: %s, period: %s, TTL: %s, data: %v has expired.", timerKey, req.DueTime, req.Period, req.TTL, req.Data)
				break L
			case <-stop:
				// timer has been already deleted
				log.Infof("timer %s with parameters: dueTime: %s, period: %s, TTL: %s, data: %v has been deleted.", timerKey, req.DueTime, req.Period, req.TTL, req.Data)
				return
			}

			if _, exists := a.actorsTable.Load(actorKey); exists {
				if err = a.executeTimer(req.ActorType, req.ActorID, req.Name, req.DueTime, req.Period, req.Callback, req.Data); err != nil {
					log.Errorf("error invoking timer on actor %s: %s", actorKey, err)
				}
				if repeats > 0 {
					repeats--
				}
			} else {
				log.Errorf("could not find active timer %s", timerKey)
				return
			}
			if repeats == 0 || (years == 0 && months == 0 && days == 0 && period == 0) {
				log.Infof("timer %s has been completed", timerKey)
				break L
			}
			nextTime = nextTime.AddDate(years, months, days).Add(period)
			if nextTimer.Stop() {
				<-nextTimer.C
			}
			nextTimer.Reset(a.clock.Until(nextTime))
		}
		err = a.DeleteTimer(ctx, &DeleteTimerRequest{
			Name:      req.Name,
			ActorID:   req.ActorID,
			ActorType: req.ActorType,
		})
		if err != nil {
			log.Errorf("error deleting timer %s: %v", timerKey, err)
		}
	}(stop, req)
	return nil
}

func (a *actorsRuntime) executeTimer(actorType, actorID, name, dueTime, period, callback string, data interface{}) error {
	t := TimerResponse{
		Callback: callback,
		Data:     data,
		DueTime:  dueTime,
		Period:   period,
	}
	b, err := json.Marshal(&t)
	if err != nil {
		return err
	}

	policyDef := a.resiliency.ActorPreLockPolicy(actorType, actorID)

	log.Debugf("executing timer %s for actor type %s with id %s", name, actorType, actorID)
	req := invokev1.NewInvokeMethodRequest("timer/"+name).
		WithActor(actorType, actorID).
		WithRawDataBytes(b).
		WithContentType(invokev1.JSONContentType)
	if policyDef != nil {
		req.WithReplay(policyDef.HasRetries())
	}
	defer req.Close()

	policyRunner := resiliency.NewRunnerWithOptions(context.TODO(), policyDef,
		resiliency.RunnerOpts[*invokev1.InvokeMethodResponse]{
			Disposer: resiliency.DisposerCloser[*invokev1.InvokeMethodResponse],
		},
	)
	imr, err := policyRunner(func(ctx context.Context) (*invokev1.InvokeMethodResponse, error) {
		return a.callLocalActor(ctx, req)
	})
	if err != nil {
		log.Errorf("error executing timer %s for actor type %s with id %s: %v", name, actorType, actorID, err)
	}
	if imr != nil {
		_ = imr.Close()
	}
	return err
}

func (a *actorsRuntime) saveActorTypeMetadata(actorType string, actorMetadata *ActorMetadata) error {
	setReq := &state.SetRequest{
		Key:   constructCompositeKey("actors", actorType, "metadata"),
		Value: actorMetadata,
		ETag:  actorMetadata.Etag,
		Options: state.SetStateOption{
			Concurrency: state.FirstWrite,
		},
	}
	policyRunner := resiliency.NewRunner[any](context.TODO(),
		a.resiliency.ComponentOutboundPolicy(a.storeName, resiliency.Statestore),
	)
	_, err := policyRunner(func(ctx context.Context) (any, error) {
		return nil, a.store.Set(ctx, setReq)
	})
	return err
}

func (a *actorsRuntime) getActorTypeMetadata(actorType string, migrate bool) (result *ActorMetadata, err error) {
	if a.store == nil {
		return nil, errors.New("actors: state store does not exist or incorrectly configured")
	}

	var policyDef *resiliency.PolicyDefinition
	if !a.resiliency.PolicyDefined(a.storeName, resiliency.ComponentOutboundPolicy) {
		// If there is no policy defined, wrap the whole logic in the built-in.
		policyDef = a.resiliency.BuiltInPolicy(resiliency.BuiltInActorReminderRetries)
	} else {
		// Else, we can rely on the underlying operations all being covered by resiliency.
		noOp := resiliency.NoOp{}
		policyDef = noOp.EndpointPolicy("", "")
	}
	policyRunner := resiliency.NewRunner[*ActorMetadata](context.TODO(), policyDef)
	getReq := &state.GetRequest{
		Key: constructCompositeKey("actors", actorType, "metadata"),
	}
	return policyRunner(func(ctx context.Context) (*ActorMetadata, error) {
		rResp, rErr := a.store.Get(ctx, getReq)
		if rErr != nil {
			return nil, rErr
		}
		actorMetadata := &ActorMetadata{
			ID: metadataZeroID,
			RemindersMetadata: ActorRemindersMetadata{
				partitionsEtag: nil,
				PartitionCount: 0,
			},
			Etag: nil,
		}
		if len(rResp.Data) > 0 {
			rErr = json.Unmarshal(rResp.Data, actorMetadata)
			if rErr != nil {
				return nil, fmt.Errorf("could not parse metadata for actor type %s (%s): %w", actorType, string(rResp.Data), rErr)
			}
			actorMetadata.Etag = rResp.ETag
		}

		if migrate {
			rErr = a.migrateRemindersForActorType(actorType, actorMetadata)
			if rErr != nil {
				return nil, rErr
			}
		}

		return actorMetadata, nil
	})
}

func (a *actorsRuntime) migrateRemindersForActorType(actorType string, actorMetadata *ActorMetadata) error {
	reminderPartitionCount := a.config.GetRemindersPartitionCountForType(actorType)
	if actorMetadata.RemindersMetadata.PartitionCount == reminderPartitionCount {
		return nil
	}

	if actorMetadata.RemindersMetadata.PartitionCount > reminderPartitionCount {
		log.Warnf("cannot decrease number of partitions for reminders of actor type %s", actorType)
		return nil
	}

	// Nice to have: avoid conflicting migration within the same process.
	a.remindersMigrationLock.Lock()
	defer a.remindersMigrationLock.Unlock()
	log.Warnf("migrating actor metadata record for actor type %s", actorType)

	// Fetch all reminders for actor type.
	reminderRefs, refreshedActorMetadata, err := a.getRemindersForActorType(actorType, false)
	if err != nil {
		return err
	}
	if refreshedActorMetadata.ID != actorMetadata.ID {
		return fmt.Errorf("could not migrate reminders for actor type %s due to race condition in actor metadata", actorType)
	}

	log.Infof("migrating %d reminders for actor type %s", len(reminderRefs), actorType)
	*actorMetadata = *refreshedActorMetadata

	// Recreate as a new metadata identifier.
	actorMetadata.ID = uuid.NewString()
	actorMetadata.RemindersMetadata.PartitionCount = reminderPartitionCount
	actorRemindersPartitions := make([][]Reminder, actorMetadata.RemindersMetadata.PartitionCount)
	for i := 0; i < actorMetadata.RemindersMetadata.PartitionCount; i++ {
		actorRemindersPartitions[i] = make([]Reminder, 0)
	}

	// Recalculate partition for each reminder.
	for _, reminderRef := range reminderRefs {
		partitionID := actorMetadata.calculateReminderPartition(reminderRef.reminder.ActorID, reminderRef.reminder.Name)
		actorRemindersPartitions[partitionID-1] = append(actorRemindersPartitions[partitionID-1], reminderRef.reminder)
	}

	// Save to database.
	for i := 0; i < actorMetadata.RemindersMetadata.PartitionCount; i++ {
		partitionID := i + 1
		stateKey := actorMetadata.calculateRemindersStateKey(actorType, uint32(partitionID))
		stateValue := actorRemindersPartitions[i]
		err = a.saveRemindersInPartition(context.TODO(), stateKey, stateValue, nil, actorMetadata.ID)
		if err != nil {
			return err
		}
	}

	// Save new metadata so the new "metadataID" becomes the new de factor referenced list for reminders.
	err = a.saveActorTypeMetadata(actorType, actorMetadata)
	if err != nil {
		return err
	}
	log.Warnf(
		"completed actor metadata record migration for actor type %s, new metadata ID = %s",
		actorType, actorMetadata.ID)
	return nil
}

type bulkGetRes struct {
	bulkGet      bool
	bulkResponse []state.BulkGetResponse
}

func (a *actorsRuntime) getRemindersForActorType(actorType string, migrate bool) ([]actorReminderReference, *ActorMetadata, error) {
	if a.store == nil {
		return nil, nil, errors.New("actors: state store does not exist or incorrectly configured")
	}

	actorMetadata, merr := a.getActorTypeMetadata(actorType, migrate)
	if merr != nil {
		return nil, nil, fmt.Errorf("could not read actor type metadata: %w", merr)
	}

	policyDef := a.resiliency.ComponentOutboundPolicy(a.storeName, resiliency.Statestore)

	log.Debugf(
		"starting to read reminders for actor type %s (migrate=%t), with metadata id %s and %d partitions",
		actorType, migrate, actorMetadata.ID, actorMetadata.RemindersMetadata.PartitionCount)
	if actorMetadata.RemindersMetadata.PartitionCount >= 1 {
		metadata := map[string]string{metadataPartitionKey: actorMetadata.ID}
		actorMetadata.RemindersMetadata.partitionsEtag = map[uint32]*string{}
		reminders := []actorReminderReference{}

		keyPartitionMap := map[string]uint32{}
		getRequests := []state.GetRequest{}
		for i := 1; i <= actorMetadata.RemindersMetadata.PartitionCount; i++ {
			partition := uint32(i)
			key := actorMetadata.calculateRemindersStateKey(actorType, partition)
			keyPartitionMap[key] = partition
			getRequests = append(getRequests, state.GetRequest{
				Key:      key,
				Metadata: metadata,
			})
		}

		policyRunner := resiliency.NewRunner[*bulkGetRes](context.TODO(), policyDef)
		bgr, err := policyRunner(func(ctx context.Context) (*bulkGetRes, error) {
			rBulkGet, rBulkResponse, rErr := a.store.BulkGet(ctx, getRequests)
			if rErr != nil {
				return &bulkGetRes{}, rErr
			}
			return &bulkGetRes{
				bulkGet:      rBulkGet,
				bulkResponse: rBulkResponse,
			}, nil
		})
		if bgr == nil {
			bgr = &bulkGetRes{}
		}
		if bgr.bulkGet {
			if err != nil {
				return nil, nil, err
			}
		} else {
			// TODO(artursouza): refactor this fallback into default implementation in contrib.
			// if store doesn't support bulk get, fallback to call get() method one by one
			limiter := concurrency.NewLimiter(actorMetadata.RemindersMetadata.PartitionCount)
			bgr.bulkResponse = make([]state.BulkGetResponse, len(getRequests))
			for i := range getRequests {
				getRequest := getRequests[i]
				bgr.bulkResponse[i].Key = getRequest.Key

				fn := func(param interface{}) {
					r := param.(*state.BulkGetResponse)
					policyRunner := resiliency.NewRunner[*state.GetResponse](context.TODO(), policyDef)
					resp, ferr := policyRunner(func(ctx context.Context) (*state.GetResponse, error) {
						return a.store.Get(ctx, &getRequest)
					})
					if ferr != nil {
						r.Error = ferr.Error()
						return
					}

					if resp == nil || len(resp.Data) == 0 {
						r.Error = "data not found for reminder partition"
						return
					}

					r.Data = json.RawMessage(resp.Data)
					r.ETag = resp.ETag
					r.Metadata = resp.Metadata
				}

				limiter.Execute(fn, &bgr.bulkResponse[i])
			}
			limiter.Wait()
		}

		for _, resp := range bgr.bulkResponse {
			partition := keyPartitionMap[resp.Key]
			actorMetadata.RemindersMetadata.partitionsEtag[partition] = resp.ETag
			if resp.Error != "" {
				return nil, nil, fmt.Errorf("could not get reminders partition %v: %v", resp.Key, resp.Error)
			}

			var batch []Reminder
			if len(resp.Data) > 0 {
				err = json.Unmarshal(resp.Data, &batch)
				if err != nil {
					return nil, nil, fmt.Errorf("could not parse actor reminders partition %v: %w", resp.Key, err)
				}
			} else {
				return nil, nil, fmt.Errorf("no data found for reminder partition %v: %w", resp.Key, err)
			}

			for j := range batch {
				reminders = append(reminders, actorReminderReference{
					actorMetadataID:           actorMetadata.ID,
					actorRemindersPartitionID: partition,
					reminder:                  batch[j],
				})
			}
		}

		log.Debugf(
			"finished reading reminders for actor type %s (migrate=%t), with metadata id %s and %d partitions: total of %d reminders",
			actorType, migrate, actorMetadata.ID, actorMetadata.RemindersMetadata.PartitionCount, len(reminders))
		return reminders, actorMetadata, nil
	}

	key := constructCompositeKey("actors", actorType)
	policyRunner := resiliency.NewRunner[*state.GetResponse](context.TODO(), policyDef)
	resp, err := policyRunner(func(ctx context.Context) (*state.GetResponse, error) {
		return a.store.Get(ctx, &state.GetRequest{
			Key: key,
		})
	})
	if err != nil {
		return nil, nil, err
	}

	if resp == nil {
		resp = &state.GetResponse{}
	}
	log.Debugf("read reminders from %s without partition: %s", key, string(resp.Data))

	var reminders []Reminder
	if len(resp.Data) > 0 {
		err = json.Unmarshal(resp.Data, &reminders)
		if err != nil {
			return nil, nil, fmt.Errorf("could not parse actor reminders: %w", err)
		}
	}

	reminderRefs := make([]actorReminderReference, len(reminders))
	for j := range reminders {
		reminderRefs[j] = actorReminderReference{
			actorMetadataID:           actorMetadata.ID,
			actorRemindersPartitionID: 0,
			reminder:                  reminders[j],
		}
	}

	actorMetadata.RemindersMetadata.partitionsEtag = map[uint32]*string{
		0: resp.ETag,
	}

	log.Debugf(
		"finished reading reminders for actor type %s (migrate=%t), with metadata id %s and no partitions: total of %d reminders",
		actorType, migrate, actorMetadata.ID, len(reminderRefs))
	return reminderRefs, actorMetadata, nil
}

func (a *actorsRuntime) saveRemindersInPartition(ctx context.Context, stateKey string, reminders []Reminder, etag *string, databasePartitionKey string) error {
	// Even when data is not partitioned, the save operation is the same.
	// The only difference is stateKey.
	log.Debugf("saving %d reminders in %s ...", len(reminders), stateKey)
	policyRunner := resiliency.NewRunner[any](ctx,
		a.resiliency.ComponentOutboundPolicy(a.storeName, resiliency.Statestore),
	)
	req := &state.SetRequest{
		Key:      stateKey,
		Value:    reminders,
		ETag:     etag,
		Metadata: map[string]string{metadataPartitionKey: databasePartitionKey},
		Options: state.SetStateOption{
			Concurrency: state.FirstWrite,
		},
	}
	_, err := policyRunner(func(ctx context.Context) (any, error) {
		return nil, a.store.Set(ctx, req)
	})
	return err
}

func (a *actorsRuntime) DeleteReminder(ctx context.Context, req *DeleteReminderRequest) error {
	a.activeRemindersLock.Lock()
	defer a.activeRemindersLock.Unlock()
	return a.doDeleteReminder(ctx, req)
}

func (a *actorsRuntime) doDeleteReminder(ctx context.Context, req *DeleteReminderRequest) error {
	if a.store == nil {
		return errors.New("actors: state store does not exist or incorrectly configured")
	}

	if !a.waitForEvaluationChan() {
		return errors.New("error deleting reminder: timed out after 5s")
	}

	actorKey := constructCompositeKey(req.ActorType, req.ActorID)
	reminderKey := constructCompositeKey(actorKey, req.Name)

	stop, exists := a.activeReminders.Load(reminderKey)
	if exists {
		log.Infof("Found reminder with key: %v. Deleting reminder", reminderKey)
		close(stop.(chan bool))
		a.activeReminders.Delete(reminderKey)
	}

	var policyDef *resiliency.PolicyDefinition
	if !a.resiliency.PolicyDefined(a.storeName, resiliency.ComponentOutboundPolicy) {
		// If there is no policy defined, wrap the whole logic in the built-in.
		policyDef = a.resiliency.BuiltInPolicy(resiliency.BuiltInActorReminderRetries)
	} else {
		// Else, we can rely on the underlying operations all being covered by resiliency.
		noOp := resiliency.NoOp{}
		policyDef = noOp.EndpointPolicy("", "")
	}
	policyRunner := resiliency.NewRunner[any](ctx, policyDef)
	_, err := policyRunner(func(ctx context.Context) (any, error) {
		reminders, actorMetadata, rErr := a.getRemindersForActorType(req.ActorType, false)
		if rErr != nil {
			return nil, rErr
		}

		// remove from partition first.
		remindersInPartition, stateKey, etag := actorMetadata.removeReminderFromPartition(reminders, req.ActorType, req.ActorID, req.Name)

		// now, we can remove from the "global" list.
		for i := len(reminders) - 1; i >= 0; i-- {
			if reminders[i].reminder.ActorType == req.ActorType && reminders[i].reminder.ActorID == req.ActorID && reminders[i].reminder.Name == req.Name {
				reminders = append(reminders[:i], reminders[i+1:]...)
			}
		}

		// Get the database partiton key (needed for CosmosDB)
		databasePartitionKey := actorMetadata.calculateDatabasePartitionKey(stateKey)

		// Then, save the partition to the database.
		rErr = a.saveRemindersInPartition(ctx, stateKey, remindersInPartition, etag, databasePartitionKey)
		if rErr != nil {
			return nil, rErr
		}

		// Finally, we must save metadata to get a new eTag.
		// This avoids a race condition between an update and a repartitioning.
		rErr = a.saveActorTypeMetadata(req.ActorType, actorMetadata)
		if rErr != nil {
			return nil, rErr
		}

		a.remindersLock.Lock()
		a.reminders[req.ActorType] = reminders
		a.remindersLock.Unlock()
		return nil, nil
	})
	if err != nil {
		return err
	}

	deletePolicyRunner := resiliency.NewRunner[any](ctx,
		a.resiliency.ComponentOutboundPolicy(a.storeName, resiliency.Statestore),
	)
	deleteReq := &state.DeleteRequest{
		Key: reminderKey,
	}
	_, err = deletePolicyRunner(func(ctx context.Context) (any, error) {
		return nil, a.store.Delete(ctx, deleteReq)
	})
	return err
}

// Deprecated: Currently RenameReminder renames by deleting-then-inserting-again.
// This implementation is not fault-tolerant, as a failed insert after deletion would result in no reminder
func (a *actorsRuntime) RenameReminder(ctx context.Context, req *RenameReminderRequest) error {
	log.Warn("[DEPRECATION NOTICE] Currently RenameReminder renames by deleting-then-inserting-again. This implementation is not fault-tolerant, as a failed insert after deletion would result in no reminder")

	if a.store == nil {
		return errors.New("actors: state store does not exist or incorrectly configured")
	}

	a.activeRemindersLock.Lock()
	defer a.activeRemindersLock.Unlock()

	oldReminder, exists := a.getReminder(req.OldName, req.ActorType, req.ActorID)
	if !exists {
		return nil
	}

	// delete old reminder
	err := a.doDeleteReminder(ctx, &DeleteReminderRequest{
		ActorID:   req.ActorID,
		ActorType: req.ActorType,
		Name:      req.OldName,
	})
	if err != nil {
		return err
	}

	if !a.waitForEvaluationChan() {
		return errors.New("error renaming reminder: timed out after 5s")
	}

	reminder := Reminder{
		ActorID:        req.ActorID,
		ActorType:      req.ActorType,
		Name:           req.NewName,
		Data:           oldReminder.Data,
		Period:         oldReminder.Period,
		DueTime:        oldReminder.DueTime,
		RegisteredTime: oldReminder.RegisteredTime,
		ExpirationTime: oldReminder.ExpirationTime,
	}

	stop := make(chan bool)

	err = a.storeReminder(ctx, reminder, stop)
	if err != nil {
		return err
	}

	return a.startReminder(&reminder, stop)
}

func (a *actorsRuntime) storeReminder(ctx context.Context, reminder Reminder, stopChannel chan bool) error {
	// Store the reminder in active reminders list
	actorKey := constructCompositeKey(reminder.ActorType, reminder.ActorID)
	reminderKey := constructCompositeKey(actorKey, reminder.Name)

	a.activeReminders.Store(reminderKey, stopChannel)

	var policyDef *resiliency.PolicyDefinition
	if !a.resiliency.PolicyDefined(a.storeName, resiliency.ComponentOutboundPolicy) {
		// If there is no policy defined, wrap the whole logic in the built-in.
		policyDef = a.resiliency.BuiltInPolicy(resiliency.BuiltInActorReminderRetries)
	} else {
		// Else, we can rely on the underlying operations all being covered by resiliency.
		noOp := resiliency.NoOp{}
		policyDef = noOp.EndpointPolicy("", "")
	}
	policyRunner := resiliency.NewRunner[any](ctx, policyDef)
	_, err := policyRunner(func(ctx context.Context) (any, error) {
		reminders, actorMetadata, rErr := a.getRemindersForActorType(reminder.ActorType, false)
		if rErr != nil {
			return nil, rErr
		}

		// First we add it to the partition list.
		remindersInPartition, reminderRef, stateKey, etag := actorMetadata.insertReminderInPartition(reminders, reminder)

		// Get the database partition key (needed for CosmosDB)
		databasePartitionKey := actorMetadata.calculateDatabasePartitionKey(stateKey)

		// Now we can add it to the "global" list.
		reminders = append(reminders, reminderRef)

		// Then, save the partition to the database.
		rErr = a.saveRemindersInPartition(ctx, stateKey, remindersInPartition, etag, databasePartitionKey)
		if rErr != nil {
			return nil, rErr
		}

		// Finally, we must save metadata to get a new eTag.
		// This avoids a race condition between an update and a repartitioning.
		errForSaveMetadata := a.saveActorTypeMetadata(reminder.ActorType, actorMetadata)
		if errForSaveMetadata != nil {
			return nil, errForSaveMetadata
		}

		a.remindersLock.Lock()
		a.reminders[reminder.ActorType] = reminders
		a.remindersLock.Unlock()
		return nil, nil
	})
	if err != nil {
		return err
	}
	return nil
}

func (a *actorsRuntime) GetReminder(ctx context.Context, req *GetReminderRequest) (*Reminder, error) {
	reminders, _, err := a.getRemindersForActorType(req.ActorType, false)
	if err != nil {
		return nil, err
	}

	for _, r := range reminders {
		if r.reminder.ActorID == req.ActorID && r.reminder.Name == req.Name {
			return &Reminder{
				Data:    r.reminder.Data,
				DueTime: r.reminder.DueTime,
				Period:  r.reminder.Period,
			}, nil
		}
	}
	return nil, nil
}

func (a *actorsRuntime) DeleteTimer(ctx context.Context, req *DeleteTimerRequest) error {
	actorKey := constructCompositeKey(req.ActorType, req.ActorID)
	timerKey := constructCompositeKey(actorKey, req.Name)

	stopChan, exists := a.activeTimers.Load(timerKey)
	if exists {
		close(stopChan.(chan bool))
		a.activeTimers.Delete(timerKey)
	}

	return nil
}

func (a *actorsRuntime) RegisterInternalActor(ctx context.Context, actorType string, actor InternalActor) error {
	if !a.haveCompatibleStorage() {
		return fmt.Errorf("unable to register internal actor '%s': %w", actorType, ErrIncompatibleStateStore)
	}

	if _, exists := a.internalActors[actorType]; exists {
		return fmt.Errorf("actor type %s already registered", actorType)
	} else {
		if err := a.internalActorChannel.AddInternalActor(actorType, actor); err != nil {
			return err
		}
		a.internalActors[actorType] = actor

		log.Debugf("registering internal actor type: %s", actorType)
		actor.SetActorRuntime(a)
		a.config.HostedActorTypes = append(a.config.HostedActorTypes, actorType)
		if a.placement != nil {
			if err := a.placement.AddHostedActorType(actorType); err != nil {
				return fmt.Errorf("error updating hosted actor types: %s", err)
			}
		}
	}
	return nil
}

func (a *actorsRuntime) GetActiveActorsCount(ctx context.Context) []ActiveActorsCount {
	actorCountMap := make(map[string]int, len(a.config.HostedActorTypes))
	for _, actorType := range a.config.HostedActorTypes {
		if !isInternalActor(actorType) {
			actorCountMap[actorType] = 0
		}
	}
	a.actorsTable.Range(func(key, value interface{}) bool {
		actorType, _ := a.getActorTypeAndIDFromKey(key.(string))
		if !isInternalActor(actorType) {
			actorCountMap[actorType]++
		}
		return true
	})

	activeActorsCount := make([]ActiveActorsCount, len(actorCountMap))
	n := 0
	for actorType, count := range actorCountMap {
		activeActorsCount[n] = ActiveActorsCount{Type: actorType, Count: count}
		n++
	}

	return activeActorsCount
}

func isInternalActor(actorType string) bool {
	return strings.HasPrefix(actorType, InternalActorTypePrefix)
}

// Stop closes all network connections and resources used in actor runtime.
func (a *actorsRuntime) Stop() {
	if a.placement != nil {
		a.placement.Stop()
	}
	if a.cancel != nil {
		a.cancel()
		a.cancel = nil
	}
}

// ValidateHostEnvironment validates that actors can be initialized properly given a set of parameters
// And the mode the runtime is operating in.
func ValidateHostEnvironment(mTLSEnabled bool, mode modes.DaprMode, namespace string) error {
	switch mode {
	case modes.KubernetesMode:
		if mTLSEnabled && namespace == "" {
			return errors.New("actors must have a namespace configured when running in Kubernetes mode")
		}
	}
	return nil
}
